kabudadada
Add Foam-Agent MCP service with conda environment support
7eb1167
# hpc_runner_node.py
from typing import List
import os
import subprocess
import json
from pydantic import BaseModel, Field
import re
from utils import (
save_file, remove_files, remove_file,
run_command, check_foam_errors, retrieve_faiss, remove_numeric_folders
)
def extract_cluster_info(state) -> dict:
"""
Extract cluster information from user requirement using LLM.
Args:
state: Current graph state containing user requirement and LLM service
Returns:
dict: Dictionary containing cluster_name, account_number, and other cluster details
"""
user_requirement = state["user_requirement"]
case_dir = state["case_dir"]
# Check if decomposeParDict exists and read its content
decompose_par_dict_content = ""
decompose_par_dict_path = os.path.join(case_dir, "system", "decomposeParDict")
if os.path.exists(decompose_par_dict_path):
try:
with open(decompose_par_dict_path, 'r') as f:
decompose_par_dict_content = f.read()
except Exception as e:
print(f"Warning: Could not read decomposeParDict: {e}")
system_prompt = (
"You are an expert in HPC cluster analysis. "
"Analyze the user requirement to extract cluster information. "
"Look for keywords like: cluster name, account number, partition, queue, "
"specific cluster names (e.g., Stampede2, Frontera, Summit, etc.), "
"account numbers, project codes, or any mention of specific HPC systems. "
""
"IMPORTANT: If a decomposeParDict file is provided, analyze it to determine "
"the appropriate number of tasks per node (ntasks_per_node) based on the "
"decomposition settings. The number of tasks should match the total number "
"of subdomains or processes specified in the decomposeParDict."
""
"Return a JSON object with the following structure: "
"{"
" 'cluster_name': 'name of the cluster or HPC system', "
" 'account_number': 'account number or project code', "
" 'partition': 'partition name (e.g., normal, debug, gpu)', "
" 'nodes': 'number of nodes (default: 1)', "
" 'ntasks_per_node': 'number of tasks per node (determine from decomposeParDict if available)', "
" 'time_limit': 'time limit in hours (default: 24)', "
" 'memory': 'memory per node in GB (default: 64)'"
"}"
"If any information is not specified, use reasonable defaults based on your expertise. "
"Only return valid JSON. Don't include any other text."
)
user_prompt = (
f"User requirement: {user_requirement}\n\n"
)
if decompose_par_dict_content:
user_prompt += (
f"decomposeParDict content:\n{decompose_par_dict_content}\n\n"
"Analyze the decomposeParDict to determine the appropriate number of tasks per node "
"based on the decomposition settings. "
)
user_prompt += "Extract cluster information and return as JSON object."
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Try to parse the JSON response
try:
# Clean up the response to extract JSON
response = response.strip()
if response.startswith('```json'):
response = response[7:]
if response.endswith('```'):
response = response[:-3]
response = response.strip()
cluster_info = json.loads(response)
# Set defaults for missing values
defaults = {
'cluster_name': 'default_cluster',
'account_number': 'default_account',
'partition': 'normal',
'nodes': 1,
'ntasks_per_node': 1,
'time_limit': 24,
'memory': 64
}
for key, default_value in defaults.items():
if key not in cluster_info or cluster_info[key] is None:
cluster_info[key] = default_value
return cluster_info
except (json.JSONDecodeError, KeyError) as e:
print(f"Error parsing cluster info from LLM response: {e}")
print(f"LLM response: {response}")
# Return default values if parsing fails
return {
'cluster_name': 'default_cluster',
'account_number': 'default_account',
'partition': 'normal',
'nodes': 1,
'ntasks_per_node': 1,
'time_limit': 24,
'memory': 64
}
def create_slurm_script(case_dir: str, cluster_info: dict, state) -> str:
"""
Create a SLURM script for OpenFOAM simulation using LLM.
Args:
case_dir: Directory containing the OpenFOAM case
cluster_info: Dictionary containing cluster configuration
state: Current graph state containing LLM service
Returns:
str: Path to the created SLURM script
"""
system_prompt = (
"You are an expert in HPC cluster job submission and SLURM scripting. "
"Create a complete SLURM script for running OpenFOAM simulations. "
"The script should include:"
"1. Proper SLURM directives (#SBATCH) based on the cluster information provided"
"2. Do not load openfoam"
"3. Load libaraies for openfoam for run in parallel"
"4. Directory navigation and execution of the Allrun script"
"5. Error handling and status reporting"
"6. Any cluster-specific optimizations or requirements"
"7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript."
""
"Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
"Make sure the script is executable and follows best practices for the specified cluster."
)
user_prompt = (
f"Create a SLURM script for OpenFOAM simulation with the following parameters:\n"
f"Cluster: {cluster_info['cluster_name']}\n"
f"Account: {cluster_info['account_number']}\n"
f"Partition: {cluster_info['partition']}\n"
f"Nodes: {cluster_info['nodes']}\n"
f"Tasks per node: {cluster_info['ntasks_per_node']}\n"
f"Time limit: {cluster_info['time_limit']} hours\n"
f"Memory: {cluster_info['memory']} GB per node\n"
f"Case directory: {case_dir}\n"
f""
f"Generate a complete SLURM script that will run the OpenFOAM simulation using the Allrun script."
)
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Clean up the response to extract just the script content
script_content = response.strip()
if script_content.startswith('```bash'):
script_content = script_content[7:]
elif script_content.startswith('```'):
script_content = script_content[3:]
if script_content.endswith('```'):
script_content = script_content[:-3]
script_content = script_content.strip()
# Ensure the script starts with shebang
if not script_content.startswith('#!/bin/bash'):
script_content = '#!/bin/bash\n' + script_content
script_path = os.path.join(case_dir, "submit_job.slurm")
save_file(script_path, script_content)
return script_path
def create_slurm_script_with_error_context(case_dir: str, cluster_info: dict, state, error_message: str = "", previous_script_content: str = "") -> str:
"""
Create a SLURM script for OpenFOAM simulation using LLM, with error context for retries.
Args:
case_dir: Directory containing the OpenFOAM case
cluster_info: Dictionary containing cluster configuration
state: Current graph state containing LLM service
error_message: Error message from previous submission attempt
previous_script_content: Content of the previous failed SLURM script
Returns:
str: Path to the created SLURM script
"""
system_prompt = (
"You are an expert in HPC cluster job submission and SLURM scripting. "
"Create a complete SLURM script for running OpenFOAM simulations. "
"The script should include:"
"1. Proper SLURM directives (#SBATCH) based on the cluster information provided"
"2. Do not load OpenFOAM"
"3. Load libaraies for openfoam for run in parallel"
"4. Directory navigation and execution of the Allrun script"
"5. Error handling and status reporting"
"6. Any cluster-specific optimizations or requirements"
"7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript."
""
"If a previous script and error message are provided, analyze the error and the script "
"to identify what went wrong and fix it. Common issues to consider:"
"- Invalid account numbers or partitions"
"- Insufficient resources (memory, time, nodes)"
"- Missing modules or environment variables"
"- Incorrect file paths or permissions"
"- Cluster-specific requirements or restrictions"
"- Syntax errors in SLURM directives"
"- Incorrect module names or versions"
""
"Compare the previous script with the error message to identify the specific issue "
"and create a corrected version."
""
"Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
"Make sure the script is executable and follows best practices for the specified cluster."
)
user_prompt = (
f"Create a SLURM script for OpenFOAM simulation with the following parameters:\n"
f"Cluster: {cluster_info['cluster_name']}\n"
f"Account: {cluster_info['account_number']}\n"
f"Partition: {cluster_info['partition']}\n"
f"Nodes: {cluster_info['nodes']}\n"
f"Tasks per node: {cluster_info['ntasks_per_node']}\n"
f"Time limit: {cluster_info['time_limit']} hours\n"
f"Memory: {cluster_info['memory']} GB per node\n"
f"Case directory: {case_dir}\n"
)
if error_message and previous_script_content:
user_prompt += f"\nPrevious submission failed with error: {error_message}\n"
user_prompt += f"Previous SLURM script that failed:\n```bash\n{previous_script_content}\n```\n"
user_prompt += "Please analyze this error and the previous script to identify the issue and create a corrected version."
user_prompt += f"\nGenerate a complete SLURM script that will run the OpenFOAM simulation using the Allrun script. Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting."
response = state["llm_service"].invoke(user_prompt, system_prompt)
# Clean up the response to extract just the script content
script_content = response.strip()
if script_content.startswith('```bash'):
script_content = script_content[7:]
elif script_content.startswith('```'):
script_content = script_content[3:]
if script_content.endswith('```'):
script_content = script_content[:-3]
script_content = script_content.strip()
# Ensure the script starts with shebang
if not script_content.startswith('#!/bin/bash'):
script_content = '#!/bin/bash\n' + script_content
script_path = os.path.join(case_dir, "submit_job.slurm")
save_file(script_path, script_content)
return script_path
def submit_slurm_job(script_path: str) -> tuple:
"""
Submit a SLURM job and return job ID.
Args:
script_path: Path to the SLURM script
Returns:
tuple: (job_id, success, error_message)
"""
try:
# Submit the job
result = subprocess.run(
["sbatch", script_path],
capture_output=True,
text=True,
check=True
)
# Extract job ID from output
output = result.stdout.strip()
job_id_match = re.search(r'Submitted batch job (\d+)', output)
if job_id_match:
job_id = job_id_match.group(1)
return job_id, True, ""
else:
return None, False, f"Could not extract job ID from output: {output}"
except subprocess.CalledProcessError as e:
return None, False, f"Failed to submit job: {e.stderr}"
except Exception as e:
return None, False, f"Unexpected error: {str(e)}"
def check_job_status(job_id: str) -> tuple:
"""
Check the status of a SLURM job.
Args:
job_id: SLURM job ID
Returns:
tuple: (status, success, error_message)
"""
try:
result = subprocess.run(
["squeue", "-j", job_id, "--noheader", "-o", "%T"],
capture_output=True,
text=True,
check=True
)
status = result.stdout.strip()
if status:
return status, True, ""
else:
return "COMPLETED", True, "" # Job not in queue, likely completed
except subprocess.CalledProcessError as e:
return None, False, f"Failed to check job status: {e.stderr}"
except Exception as e:
return None, False, f"Unexpected error: {str(e)}"
def hpc_runner_node(state):
"""
HPC Runner node: Extract cluster info from user requirement, create SLURM script,
submit job to cluster, wait for completion, and check for errors.
Retries submission on failure up to max_loop times, regenerating script based on errors.
"""
config = state["config"]
case_dir = state["case_dir"]
allrun_file_path = os.path.join(case_dir, "Allrun")
max_loop = config.max_loop
current_attempt = 0
print(f"============================== HPC Runner ==============================")
# Clean up any previous log and error files.
out_file = os.path.join(case_dir, "Allrun.out")
err_file = os.path.join(case_dir, "Allrun.err")
remove_files(case_dir, prefix="log")
remove_file(err_file)
remove_file(out_file)
remove_numeric_folders(case_dir)
# Extract cluster information from user requirement
print("Extracting cluster information from user requirement...")
cluster_info = extract_cluster_info(state)
print(f"Cluster info extracted: {cluster_info}")
# Submit the job with retry logic
while current_attempt < max_loop:
current_attempt += 1
print(f"Attempt {current_attempt}/{max_loop}: Creating and submitting SLURM job...")
# Create SLURM script (regenerate on retry with error context)
if current_attempt == 1:
print("Creating initial SLURM script...")
script_path = create_slurm_script(case_dir, cluster_info, state)
else:
print(f"Regenerating SLURM script based on previous error...")
# Read the previous failed script content
previous_script_content = ""
try:
with open(script_path, 'r') as f:
previous_script_content = f.read()
except Exception as e:
print(f"Warning: Could not read previous script: {e}")
script_path = create_slurm_script_with_error_context(case_dir, cluster_info, state, last_error_msg, previous_script_content)
print(f"SLURM script created at: {script_path}")
job_id, success, error_msg = submit_slurm_job(script_path)
if success:
print(f"Job submitted successfully with ID: {job_id}")
break
else:
print(f"Attempt {current_attempt} failed: {error_msg}")
last_error_msg = error_msg # Store error for next iteration
if current_attempt < max_loop:
print(f"Retrying in 5 seconds...")
import time
time.sleep(5)
else:
print(f"Maximum attempts ({max_loop}) reached. Job submission failed.")
error_logs = [f"Job submission failed after {max_loop} attempts. Last error: {error_msg}"]
return {
**state,
"error_logs": error_logs,
"job_id": None,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}
# Wait for job completion
print("Waiting for job completion...")
import time
max_wait_time = 3600 # 1 hour timeout
wait_interval = 30 # Check every 30 seconds
elapsed_time = 0
while elapsed_time < max_wait_time:
status, status_success, status_error = check_job_status(job_id)
if not status_success:
print(f"Failed to check job status: {status_error}")
error_logs = [f"Status check failed: {status_error}"]
return {
**state,
"error_logs": error_logs,
"job_id": job_id,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}
print(f"Job status: {status}")
# Check if job is completed (either successfully or with error)
if status in ["COMPLETED", "FAILED", "CANCELLED", "TIMEOUT"]:
print(f"Job finished with status: {status}")
break
# Wait before checking again
time.sleep(wait_interval)
elapsed_time += wait_interval
if elapsed_time % 300 == 0: # Print progress every 5 minutes
print(f"Still waiting... ({elapsed_time//60} minutes elapsed)")
if elapsed_time >= max_wait_time:
print("Job timeout reached. Assuming job completed.")
# Check for errors in log files (similar to local_runner)
print("Checking for errors in log files...")
error_logs = check_foam_errors(case_dir)
if len(error_logs) > 0:
print("Errors detected in the HPC Allrun execution.")
print(error_logs)
else:
print("HPC Allrun executed successfully without errors.")
state['loop_count'] += 1
# Return updated state
return {
**state,
"error_logs": error_logs,
"job_id": job_id,
"cluster_info": cluster_info,
"slurm_script_path": script_path
}