Spaces:
Running
Running
File size: 18,515 Bytes
7eb1167 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 | # hpc_runner_node.py
from typing import List
import os
import subprocess
import json
from pydantic import BaseModel, Field
import re
from utils import (
save_file, remove_files, remove_file,
run_command, check_foam_errors, retrieve_faiss, remove_numeric_folders
)
def extract_cluster_info(state) -> dict:
"""
Extract cluster information from user requirement using LLM.
Args:
state: Current graph state containing user requirement and LLM service
Returns:
dict: Dictionary containing cluster_name, account_number, and other cluster details
"""
user_requirement = state["user_requirement"]
case_dir = state["case_dir"]
# Check if decomposeParDict exists and read its content
decompose_par_dict_content = ""
decompose_par_dict_path = os.path.join(case_dir, "system", "decomposeParDict")
if os.path.exists(decompose_par_dict_path):
try:
with open(decompose_par_dict_path, 'r') as f:
decompose_par_dict_content = f.read()
except Exception as e:
print(f"Warning: Could not read decomposeParDict: {e}")
system_prompt = (
"You are an expert in HPC cluster analysis. "
"Analyze the user requirement to extract cluster information. "
"Look for keywords like: cluster name, account number, partition, queue, "
"specific cluster names (e.g., Stampede2, Frontera, Summit, etc.), "
"account numbers, project codes, or any mention of specific HPC systems. "
""
"IMPORTANT: If a decomposeParDict file is provided, analyze it to determine "
"the appropriate number of tasks per node (ntasks_per_node) based on the "
"decomposition settings. The number of tasks should match the total number "
"of subdomains or processes specified in the decomposeParDict."
""
"Return a JSON object with the following structure: "
"{"
" 'cluster_name': 'name of the cluster or HPC system', "
" 'account_number': 'account number or project code', "
" 'partition': 'partition name (e.g., normal, debug, gpu)', "
" 'nodes': 'number of nodes (default: 1)', "
" 'ntasks_per_node': 'number of tasks per node (determine from decomposeParDict if available)', "
" 'time_limit': 'time limit in hours (default: 24)', "
" 'memory': 'memory per node in GB (default: 64)'"
"}"
"If any information is not specified, use reasonable defaults based on your expertise. "
"Only return valid JSON. Don't include any other text."
)
user_prompt = (
f"User requirement: {user_requirement}\n\n"
)
if decompose_par_dict_content:
user_prompt += (
f"decomposeParDict content:\n{decompose_par_dict_content}\n\n"
"Analyze the decomposeParDict to determine the appropriate number of tasks per node "
"based on the decomposition settings. "
)
user_prompt += "Extract cluster information and return as JSON object."
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Try to parse the JSON response
try:
# Clean up the response to extract JSON
response = response.strip()
if response.startswith('```json'):
response = response[7:]
if response.endswith('```'):
response = response[:-3]
response = response.strip()
cluster_info = json.loads(response)
# Set defaults for missing values
defaults = {
'cluster_name': 'default_cluster',
'account_number': 'default_account',
'partition': 'normal',
'nodes': 1,
'ntasks_per_node': 1,
'time_limit': 24,
'memory': 64
}
for key, default_value in defaults.items():
if key not in cluster_info or cluster_info[key] is None:
cluster_info[key] = default_value
return cluster_info
except (json.JSONDecodeError, KeyError) as e:
print(f"Error parsing cluster info from LLM response: {e}")
print(f"LLM response: {response}")
# Return default values if parsing fails
return {
'cluster_name': 'default_cluster',
'account_number': 'default_account',
'partition': 'normal',
'nodes': 1,
'ntasks_per_node': 1,
'time_limit': 24,
'memory': 64
}
def create_slurm_script(case_dir: str, cluster_info: dict, state) -> str:
"""
Create a SLURM script for OpenFOAM simulation using LLM.
Args:
case_dir: Directory containing the OpenFOAM case
cluster_info: Dictionary containing cluster configuration
state: Current graph state containing LLM service
Returns:
str: Path to the created SLURM script
"""
system_prompt = (
"You are an expert in HPC cluster job submission and SLURM scripting. "
"Create a complete SLURM script for running OpenFOAM simulations. "
"The script should include:"
"1. Proper SLURM directives (#SBATCH) based on the cluster information provided"
"2. Do not load openfoam"
"3. Load libaraies for openfoam for run in parallel"
"4. Directory navigation and execution of the Allrun script"
"5. Error handling and status reporting"
"6. Any cluster-specific optimizations or requirements"
"7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript."
""
"Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
"Make sure the script is executable and follows best practices for the specified cluster."
)
user_prompt = (
f"Create a SLURM script for OpenFOAM simulation with the following parameters:\n"
f"Cluster: {cluster_info['cluster_name']}\n"
f"Account: {cluster_info['account_number']}\n"
f"Partition: {cluster_info['partition']}\n"
f"Nodes: {cluster_info['nodes']}\n"
f"Tasks per node: {cluster_info['ntasks_per_node']}\n"
f"Time limit: {cluster_info['time_limit']} hours\n"
f"Memory: {cluster_info['memory']} GB per node\n"
f"Case directory: {case_dir}\n"
f""
f"Generate a complete SLURM script that will run the OpenFOAM simulation using the Allrun script."
)
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Clean up the response to extract just the script content
script_content = response.strip()
if script_content.startswith('```bash'):
script_content = script_content[7:]
elif script_content.startswith('```'):
script_content = script_content[3:]
if script_content.endswith('```'):
script_content = script_content[:-3]
script_content = script_content.strip()
# Ensure the script starts with shebang
if not script_content.startswith('#!/bin/bash'):
script_content = '#!/bin/bash\n' + script_content
script_path = os.path.join(case_dir, "submit_job.slurm")
save_file(script_path, script_content)
return script_path
def create_slurm_script_with_error_context(case_dir: str, cluster_info: dict, state, error_message: str = "", previous_script_content: str = "") -> str:
"""
Create a SLURM script for OpenFOAM simulation using LLM, with error context for retries.
Args:
case_dir: Directory containing the OpenFOAM case
cluster_info: Dictionary containing cluster configuration
state: Current graph state containing LLM service
error_message: Error message from previous submission attempt
previous_script_content: Content of the previous failed SLURM script
Returns:
str: Path to the created SLURM script
"""
system_prompt = (
"You are an expert in HPC cluster job submission and SLURM scripting. "
"Create a complete SLURM script for running OpenFOAM simulations. "
"The script should include:"
"1. Proper SLURM directives (#SBATCH) based on the cluster information provided"
"2. Do not load OpenFOAM"
"3. Load libaraies for openfoam for run in parallel"
"4. Directory navigation and execution of the Allrun script"
"5. Error handling and status reporting"
"6. Any cluster-specific optimizations or requirements"
"7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript."
""
"If a previous script and error message are provided, analyze the error and the script "
"to identify what went wrong and fix it. Common issues to consider:"
"- Invalid account numbers or partitions"
"- Insufficient resources (memory, time, nodes)"
"- Missing modules or environment variables"
"- Incorrect file paths or permissions"
"- Cluster-specific requirements or restrictions"
"- Syntax errors in SLURM directives"
"- Incorrect module names or versions"
""
"Compare the previous script with the error message to identify the specific issue "
"and create a corrected version."
""
"Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
"Make sure the script is executable and follows best practices for the specified cluster."
)
user_prompt = (
f"Create a SLURM script for OpenFOAM simulation with the following parameters:\n"
f"Cluster: {cluster_info['cluster_name']}\n"
f"Account: {cluster_info['account_number']}\n"
f"Partition: {cluster_info['partition']}\n"
f"Nodes: {cluster_info['nodes']}\n"
f"Tasks per node: {cluster_info['ntasks_per_node']}\n"
f"Time limit: {cluster_info['time_limit']} hours\n"
f"Memory: {cluster_info['memory']} GB per node\n"
f"Case directory: {case_dir}\n"
)
if error_message and previous_script_content:
user_prompt += f"\nPrevious submission failed with error: {error_message}\n"
user_prompt += f"Previous SLURM script that failed:\n```bash\n{previous_script_content}\n```\n"
user_prompt += "Please analyze this error and the previous script to identify the issue and create a corrected version."
user_prompt += f"\nGenerate a complete SLURM script that will run the OpenFOAM simulation using the Allrun script. Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Clean up the response to extract just the script content
script_content = response.strip()
if script_content.startswith('```bash'):
script_content = script_content[7:]
elif script_content.startswith('```'):
script_content = script_content[3:]
if script_content.endswith('```'):
script_content = script_content[:-3]
script_content = script_content.strip()
# Ensure the script starts with shebang
if not script_content.startswith('#!/bin/bash'):
script_content = '#!/bin/bash\n' + script_content
script_path = os.path.join(case_dir, "submit_job.slurm")
save_file(script_path, script_content)
return script_path
def submit_slurm_job(script_path: str) -> tuple:
"""
Submit a SLURM job and return job ID.
Args:
script_path: Path to the SLURM script
Returns:
tuple: (job_id, success, error_message)
"""
try:
# Submit the job
result = subprocess.run(
["sbatch", script_path],
capture_output=True,
text=True,
check=True
)
# Extract job ID from output
output = result.stdout.strip()
job_id_match = re.search(r'Submitted batch job (\d+)', output)
if job_id_match:
job_id = job_id_match.group(1)
return job_id, True, ""
else:
return None, False, f"Could not extract job ID from output: {output}"
except subprocess.CalledProcessError as e:
return None, False, f"Failed to submit job: {e.stderr}"
except Exception as e:
return None, False, f"Unexpected error: {str(e)}"
def check_job_status(job_id: str) -> tuple:
"""
Check the status of a SLURM job.
Args:
job_id: SLURM job ID
Returns:
tuple: (status, success, error_message)
"""
try:
result = subprocess.run(
["squeue", "-j", job_id, "--noheader", "-o", "%T"],
capture_output=True,
text=True,
check=True
)
status = result.stdout.strip()
if status:
return status, True, ""
else:
return "COMPLETED", True, "" # Job not in queue, likely completed
except subprocess.CalledProcessError as e:
return None, False, f"Failed to check job status: {e.stderr}"
except Exception as e:
return None, False, f"Unexpected error: {str(e)}"
def hpc_runner_node(state):
"""
HPC Runner node: Extract cluster info from user requirement, create SLURM script,
submit job to cluster, wait for completion, and check for errors.
Retries submission on failure up to max_loop times, regenerating script based on errors.
"""
config = state["config"]
case_dir = state["case_dir"]
allrun_file_path = os.path.join(case_dir, "Allrun")
max_loop = config.max_loop
current_attempt = 0
print(f"============================== HPC Runner ==============================")
# Clean up any previous log and error files.
out_file = os.path.join(case_dir, "Allrun.out")
err_file = os.path.join(case_dir, "Allrun.err")
remove_files(case_dir, prefix="log")
remove_file(err_file)
remove_file(out_file)
remove_numeric_folders(case_dir)
# Extract cluster information from user requirement
print("Extracting cluster information from user requirement...")
cluster_info = extract_cluster_info(state)
print(f"Cluster info extracted: {cluster_info}")
# Submit the job with retry logic
while current_attempt < max_loop:
current_attempt += 1
print(f"Attempt {current_attempt}/{max_loop}: Creating and submitting SLURM job...")
# Create SLURM script (regenerate on retry with error context)
if current_attempt == 1:
print("Creating initial SLURM script...")
script_path = create_slurm_script(case_dir, cluster_info, state)
else:
print(f"Regenerating SLURM script based on previous error...")
# Read the previous failed script content
previous_script_content = ""
try:
with open(script_path, 'r') as f:
previous_script_content = f.read()
except Exception as e:
print(f"Warning: Could not read previous script: {e}")
script_path = create_slurm_script_with_error_context(case_dir, cluster_info, state, last_error_msg, previous_script_content)
print(f"SLURM script created at: {script_path}")
job_id, success, error_msg = submit_slurm_job(script_path)
if success:
print(f"Job submitted successfully with ID: {job_id}")
break
else:
print(f"Attempt {current_attempt} failed: {error_msg}")
last_error_msg = error_msg # Store error for next iteration
if current_attempt < max_loop:
print(f"Retrying in 5 seconds...")
import time
time.sleep(5)
else:
print(f"Maximum attempts ({max_loop}) reached. Job submission failed.")
error_logs = [f"Job submission failed after {max_loop} attempts. Last error: {error_msg}"]
return {
**state,
"error_logs": error_logs,
"job_id": None,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}
# Wait for job completion
print("Waiting for job completion...")
import time
max_wait_time = 3600 # 1 hour timeout
wait_interval = 30 # Check every 30 seconds
elapsed_time = 0
while elapsed_time < max_wait_time:
status, status_success, status_error = check_job_status(job_id)
if not status_success:
print(f"Failed to check job status: {status_error}")
error_logs = [f"Status check failed: {status_error}"]
return {
**state,
"error_logs": error_logs,
"job_id": job_id,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}
print(f"Job status: {status}")
# Check if job is completed (either successfully or with error)
if status in ["COMPLETED", "FAILED", "CANCELLED", "TIMEOUT"]:
print(f"Job finished with status: {status}")
break
# Wait before checking again
time.sleep(wait_interval)
elapsed_time += wait_interval
if elapsed_time % 300 == 0: # Print progress every 5 minutes
print(f"Still waiting... ({elapsed_time//60} minutes elapsed)")
if elapsed_time >= max_wait_time:
print("Job timeout reached. Assuming job completed.")
# Check for errors in log files (similar to local_runner)
print("Checking for errors in log files...")
error_logs = check_foam_errors(case_dir)
if len(error_logs) > 0:
print("Errors detected in the HPC Allrun execution.")
print(error_logs)
else:
print("HPC Allrun executed successfully without errors.")
state['loop_count'] += 1
# Return updated state
return {
**state,
"error_logs": error_logs,
"job_id": job_id,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}
|