import os import subprocess import argparse import concurrent.futures from pathlib import Path import re import json def read_files_into_dict(base_path, stats=None): """ Reads files from the given base_path directory and stores their content in a dictionary. """ if stats is None: stats = { "files_total_scanned": 0, "files_skipped_encoding": 0, "files_skipped_large": 0, "files_read_success": 0, "allrun_read_success": 0, "allrun_read_fail": 0 } file_contents, file_names, folder_names = {}, [], {} base_depth = base_path.rstrip(os.sep).count(os.sep) # Read 'Allrun' file allrun_path = os.path.join(base_path, "Allrun") allrun_content = "None" # Check if "Allrun" exists and attempt to read it if os.path.isfile(allrun_path): stats["files_total_scanned"] += 1 # We are scanning the Allrun file try: with open(allrun_path, "r") as file_handle: allrun_content = file_handle.read() stats["allrun_read_success"] += 1 except UnicodeDecodeError: print(f"Skipping file due to encoding error: {allrun_path}") stats["files_skipped_encoding"] += 1 stats["allrun_read_fail"] += 1 except Exception as e: print(f"Error reading file {allrun_path}: {e}") stats["allrun_read_fail"] += 1 # Traverse the base_path directory to read files for root, _, files in os.walk(base_path): # Only read files one level below the base_path if root.rstrip(os.sep).count(os.sep) == base_depth + 1: for file in files: file_path = os.path.join(root, file) stats["files_total_scanned"] += 1 # We are scanning this file try: with open(file_path, "r") as file_handle: lines = file_handle.readlines() file_contents[file] = "".join(lines) stats["files_read_success"] += 1 folder_names[file] = os.path.relpath(root, base_path) file_names.append(file) except UnicodeDecodeError: print(f"Skipping file due to encoding error: {file_path}") stats["files_skipped_encoding"] += 1 except Exception as e: print(f"Error reading file {file_path}: {e}") return allrun_content, file_contents, file_names, folder_names, stats def find_cases(root_dir): """ Traverse the directory tree under 'root_dir' and look for cases containing a 'system' folder. For each case found, extract metadata such as case name, solver, category, and domain. Additionally, collect statistics in a "funnel-like" manner to see how many directories and files are processed, skipped due to encoding issues, skipped due to large size, etc. """ cases = [] # Initialize statistics dictionary stats = { "directories_scanned": 0, "directories_with_system": 0, "files_total_scanned": 0, "files_skipped_encoding": 0, "files_skipped_large": 0, "files_read_success": 0, "allrun_read_success": 0, "allrun_read_fail": 0 } # Get FOAM_TUTORIALS from environment or fallback FOAM_TUTORIALS = os.environ.get("FOAM_TUTORIALS", "/home/somasn/Documents/LLM/OpenFOAM-10/tutorials") blockmesh_resource_dir = os.path.join(FOAM_TUTORIALS, "resources", "blockMesh") for root, dirs, files in os.walk(root_dir): stats["directories_scanned"] += 1 # Scanning this directory # Check if the current directory contains a 'system' folder if "system" in dirs: stats["directories_with_system"] += 1 # Read files in the current directory (root) allrun_content, file_contents, file_names, folder_names, file_stats = read_files_into_dict(root, stats={ "files_total_scanned": 0, "files_skipped_encoding": 0, "files_skipped_large": 0, "files_read_success": 0, "allrun_read_success": 0, "allrun_read_fail": 0 }) # Merge file_stats into the global stats stats["files_total_scanned"] += file_stats["files_total_scanned"] stats["files_skipped_encoding"] += file_stats["files_skipped_encoding"] stats["files_skipped_large"] += file_stats["files_skipped_large"] stats["files_read_success"] += file_stats["files_read_success"] stats["allrun_read_success"] += file_stats["allrun_read_success"] stats["allrun_read_fail"] += file_stats["allrun_read_fail"] # The case name is the name of the current directory case_name = os.path.basename(root) # Initialize solver, category, and domain solver, category, domain = None, None, None # Move up to the parent directory and search up to 3 levels current_path = os.path.dirname(root) found_foam = False for level in range(3): # Stop if the path is empty or if we have reached the root_dir if (not current_path) or (os.path.basename(current_path) == os.path.basename(root_dir)): break dir_name = os.path.basename(current_path) # If the directory name ends with 'Foam', treat it as the solver if dir_name.endswith("Foam"): solver = dir_name # The parent of the solver directory is considered the domain domain = os.path.basename(os.path.dirname(current_path)) found_foam = True break elif level == 0: category = dir_name # Move one level up current_path = os.path.dirname(current_path) # If no solver directory ending with 'Foam' was found, use the relative path logic if not found_foam: category = None # Reset category in case it was partially set above relative_path = os.path.relpath(root, root_dir) path_components = relative_path.split(os.sep) # If the relative path has exactly 3 components: domain/solver/caseName if len(path_components) == 3: domain, solver = path_components[0], path_components[1] # If the relative path has exactly 4 components: domain/solver/category/caseName elif len(path_components) == 4: domain, solver, category = path_components[0], path_components[1], path_components[2] # --- NEW LOGIC: Check for missing blockMeshDict and copy if referenced in Allrun --- system_dir = os.path.join(root, "system") blockmeshdict_path = os.path.join(system_dir, "blockMeshDict") if not os.path.isfile(blockmeshdict_path): # Only try if Allrun exists and was read if allrun_content != "None": # Look for blockMesh -dict $FOAM_TUTORIALS/resources/blockMesh/ pattern = r"blockMesh\s+-dict\s+\$FOAM_TUTORIALS/resources/blockMesh/([\w\d_]+)" match = re.search(pattern, allrun_content) if match: referenced_file = match.group(1) src_blockmeshdict = os.path.join(blockmesh_resource_dir, referenced_file) if os.path.isfile(src_blockmeshdict): # Copy to system/blockMeshDict try: with open(src_blockmeshdict, "r") as src_f: blockmesh_content = src_f.read() # Save to the case's system dir os.makedirs(system_dir, exist_ok=True) with open(blockmeshdict_path, "w") as dst_f: dst_f.write(blockmesh_content) # Add to in-memory structures for output file_contents["blockMeshDict"] = blockmesh_content file_names.append("blockMeshDict") folder_names["blockMeshDict"] = "system" print(f"[INFO] Copied {src_blockmeshdict} to {blockmeshdict_path} for case {case_name}") except Exception as e: print(f"[WARNING] Failed to copy {src_blockmeshdict} to {blockmeshdict_path}: {e}") else: print(f"[WARNING] Referenced blockMeshDict {src_blockmeshdict} not found for case {case_name}") else: print(f"[INFO] No blockMesh -dict reference found in Allrun for case {case_name}") else: print(f"[INFO] No Allrun file to check for blockMeshDict reference in case {case_name}") # --- END NEW LOGIC --- # Append the extracted metadata to the 'cases' list cases.append({ "case_name": case_name, "solver": solver, "category": category, "domain": domain, "folder_names": folder_names, "file_names": file_names, "file_contents": file_contents, "allrun": allrun_content }) return cases, stats def save_cases_to_file(cases, output_dir): """ Saves case details, summary, or Allrun content to a file. """ allrun_filepath = f"{output_dir}/openfoam_allrun_scripts.txt" tutorials_summary_filepath = f"{output_dir}/openfoam_tutorials_structure.txt" tutorial_filepath = f"{output_dir}/openfoam_tutorials_details.txt" case_stats_filepath = f"{output_dir}/openfoam_case_stats.json" allrun_text = '' tutorials_summary_text = '' tutorials_text = '' case_stats = { 'case_domain': set(), 'case_category': set(), 'case_solver': set() } for case in cases: case_name, case_domain, case_category, case_solver = ( case["case_name"], case["domain"], case["category"], case["solver"] ) if case_domain: case_stats['case_domain'].add(case_domain) if case_category: case_stats['case_category'].add(case_category) if case_solver: case_stats['case_solver'].add(case_solver) # Save the case index case_index_text = "\n" case_index_text += f"case name: {case_name}\n" case_index_text += f"case domain: {case_domain}\n" case_index_text += f"case category: {case_category}\n" case_index_text += f"case solver: {case_solver}\n" case_index_text += "\n\n" # Save the directory structure folder_file_dict = {} for file_name, folder_name in case["folder_names"].items(): if folder_name not in folder_file_dict: folder_file_dict[folder_name] = [] folder_file_dict[folder_name].append(file_name) dir_structure_text = "\n" for folder_name, file_names in folder_file_dict.items(): dir_structure_text += f"directory name: {folder_name}. " dir_structure_text += f"File names in this directory: [{', '.join(file_names)}]\n" dir_structure_text += "\n\n" if case["allrun"] != "None": # Save the Allrun content allrun_text += f''' {case_index_text} {dir_structure_text} {case["allrun"]} \n\n\n ''' # Save the tutorials summary tutorials_summary_text += f"\n{case_index_text}\n{dir_structure_text}\n\n\n" # Save the detailed tutorials tutorials_text += f"\n{case_index_text}\n{dir_structure_text}\n\n" for folder_name, file_names in folder_file_dict.items(): tutorials_text += f"directory name: {folder_name}\n" for file_name in file_names: tutorials_text += f"file name: {file_name}\n" # Delete comments, such as license information, from the file contents cleaned_text = re.sub(r'/\*.*?\*/', '', case['file_contents'][file_name], flags=re.DOTALL) cleaned_text = re.sub(r'//.*', '', cleaned_text) tutorials_text += f"{cleaned_text}\n" tutorials_text += f"\n\n" tutorials_text += f"\n\n" tutorials_text += "\n\n\n\n" with open(allrun_filepath, "w", encoding="utf-8") as file: file.write(allrun_text) with open(tutorials_summary_filepath, "w", encoding="utf-8") as file: file.write(tutorials_summary_text) with open(tutorial_filepath, "w", encoding="utf-8") as file: file.write(tutorials_text) case_stats['case_category'].add("None") case_stats['case_category'] = list(case_stats['case_category']) case_stats['case_domain'] = list(case_stats['case_domain']) case_stats['case_solver'] = list(case_stats['case_solver']) with open(case_stats_filepath, "w", encoding="utf-8") as file: json.dump(case_stats, file, ensure_ascii=False, indent=4) def get_commands_from_directory(directory_path): """Retrieves all command file names from a specified directory using os.scandir.""" if not os.path.exists(directory_path): raise FileNotFoundError(f"The directory {directory_path} does not exist.") return [entry.name for entry in os.scandir(directory_path) if entry.is_file()] def get_command_help(command, directory_path): """Retrieves the help message for a given command.""" try: result = subprocess.run( f"{os.path.join(directory_path, command)} -help", shell=True, capture_output=True, text=True ) return result.stdout if result.returncode == 0 else result.stderr except Exception as e: return str(e) def fetch_command_helps(commands, directory_path): """Fetch help messages in parallel.""" with concurrent.futures.ThreadPoolExecutor() as executor: return dict(zip(commands, executor.map(lambda cmd: get_command_help(cmd, directory_path), commands))) if __name__ == "__main__": # python ./database/script/tutorial_parser.py --output_dir=./database/raw --wm_project_dir=$WM_PROJECT_DIR parser = argparse.ArgumentParser() parser.add_argument("--wm_project_dir", required=True, help="Path to WM_PROJECT_DIR") parser.add_argument("--output_dir", default='./database', help="Directory to save output files") args = parser.parse_args() print(args) tutorial_path = os.path.join(args.wm_project_dir, "tutorials") cases_info, case_stats = find_cases(tutorial_path) print(f"Statistics: {case_stats}") print(f"Found {len(cases_info)} cases in {tutorial_path}") output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) save_cases_to_file(cases_info, output_dir) commands_path = Path(args.wm_project_dir) / "platforms/linux64GccDPInt32Opt/bin" commands = get_commands_from_directory(commands_path) command_help_data = fetch_command_helps(commands, commands_path) with open(output_dir / "openfoam_commands.txt", "w", encoding="utf-8") as f: f.write("\n".join(commands) + "\n") with open(output_dir / "openfoam_command_help.txt", "w", encoding="utf-8") as f: for cmd, help_text in command_help_data.items(): f.write(f"{cmd}{help_text}\n\n")