|
|
import json |
|
|
import os |
|
|
import shutil |
|
|
import subprocess |
|
|
from loguru import logger |
|
|
from multiprocessing import Pool, Lock, Value |
|
|
|
|
|
|
|
|
from dataloaders.ProblemState import ProblemStateROCm |
|
|
from tb_eval.evaluators.interface import get_evaluators |
|
|
from tb_eval.helpers.helper import extract_first_pytest_failure |
|
|
from tb_eval.perf.efficiency import get_perf_evaluators |
|
|
|
|
|
class ROCm: |
|
|
def __init__(self, |
|
|
statis_path, |
|
|
py_folder, |
|
|
instruction_path, |
|
|
log_root, |
|
|
py_interpreter='python3', |
|
|
result_path=None |
|
|
): |
|
|
self.statis_path = statis_path |
|
|
self.py_folder = py_folder |
|
|
self.instruction_path = instruction_path |
|
|
|
|
|
self.rocm_tests = True |
|
|
self.problem_states = self.load_ps() |
|
|
self.log_root = log_root |
|
|
|
|
|
|
|
|
self.evaluator = get_evaluators["rocm"]() |
|
|
self.perf_evaluator = get_perf_evaluators["rocm"]() |
|
|
logger.info("Custom tests path set to: {}".format(self.py_folder)) |
|
|
|
|
|
def load_ps(self,): |
|
|
problem_states = [] |
|
|
with open(self.instruction_path, "r", encoding='utf-8') as file: |
|
|
instructions = json.load(file) |
|
|
statis_data = json.loads(open(self.statis_path, 'r', encoding='utf-8').read()) |
|
|
|
|
|
for line in instructions: |
|
|
instruction = line["instruction"] |
|
|
label = line["label"] |
|
|
opname = line["opname"] |
|
|
g = label.replace("<|im_end|>", "").replace("<|EOT|>", "") |
|
|
tmp = False |
|
|
for item in statis_data: |
|
|
if g in item["label"]: |
|
|
file = item["file"] |
|
|
tmp = item |
|
|
break |
|
|
if tmp: statis_data.remove(tmp) |
|
|
|
|
|
path = os.path.join(self.py_folder, file) |
|
|
assert os.path.exists(path), f"{file} not exist!" |
|
|
test_code = open(path, "r", encoding="utf-8").read().split("#"*146)[-1] |
|
|
assert "def test_" in test_code, "" |
|
|
|
|
|
problemstate = ProblemStateROCm( |
|
|
instruction=instruction, |
|
|
label=label, |
|
|
test_code=test_code, |
|
|
filename=file, |
|
|
opname=opname, |
|
|
target_kernel_name=line.get("target_kernel_name", "") |
|
|
) |
|
|
problem_states.append(problemstate) |
|
|
return problem_states |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.problem_states) |
|
|
|
|
|
def write_file(self, file_path, start_idx=0, datalen=None): |
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
|
data_len = datalen if datalen is not None else len(self) |
|
|
with open(file_path, 'w') as f: |
|
|
for ps in self.problem_states[start_idx:(start_idx + data_len)]: |
|
|
output = { |
|
|
"instruction": ps.instruction, |
|
|
"label": ps.label, |
|
|
"file": ps.filename, |
|
|
"target_kernel_name": ps.target_kernel_name, |
|
|
"predict": ps.solution if ps.solution else "", |
|
|
"speedup": ps.speedup |
|
|
} |
|
|
f.write(json.dumps(output) + "\n") |
|
|
|
|
|
def test_opt_correctness(self, code, filename, opname, tmp_dir="temp", save_scripts=True, exe_dir="pass_exe", gpu_id=0): |
|
|
tmp_dir = os.path.join(self.log_root, tmp_dir) |
|
|
os.makedirs(tmp_dir, exist_ok=True) |
|
|
exe_dir = os.path.join(self.log_root, exe_dir) |
|
|
|
|
|
os.makedirs(exe_dir, exist_ok=True) |
|
|
logger.info(f"Testing correctness for {filename} in {tmp_dir}") |
|
|
|
|
|
try: |
|
|
log_root = os.path.abspath(os.path.join(tmp_dir, "tmp")) |
|
|
os.makedirs(log_root, exist_ok=True) |
|
|
|
|
|
|
|
|
exec_root_eval = os.path.abspath(os.path.join(tmp_dir, "exec_eval")) |
|
|
os.makedirs(exec_root_eval, exist_ok=True) |
|
|
|
|
|
call_status, exec_status, stdout, stderr = self.evaluator(code, log_root, exec_root_eval, filename, opname=opname, atol=1e-2, rtol=1e-2, custom_tests_path=self.py_folder, gpu_id=gpu_id) |
|
|
|
|
|
if exec_status and save_scripts: |
|
|
|
|
|
src_file = os.path.join(exec_root_eval, opname) |
|
|
dst_file = os.path.join(exe_dir, opname) |
|
|
if os.path.exists(src_file): |
|
|
shutil.copy(src_file, dst_file) |
|
|
|
|
|
|
|
|
return bool(call_status), bool(exec_status), stdout, stderr, stdout, stderr |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Exception during correctness test for {filename}: {e}") |
|
|
return False, False, None, str(e), None, str(e) |
|
|
finally: |
|
|
if os.path.exists(log_root): |
|
|
shutil.rmtree(log_root, ignore_errors=True) |
|
|
if os.path.exists(exec_root_eval): |
|
|
shutil.rmtree(exec_root_eval, ignore_errors=True) |
|
|
|
|
|
def run_perf_evaluation(self, exec_folder, gen_perf_folder, gpu_id=0): |
|
|
""" |
|
|
Runs the performance evaluation for ROCm using the tb_eval module. |
|
|
|
|
|
Args: |
|
|
exec_folder (str): The directory containing the correctly executed scripts. |
|
|
gen_perf_folder (str): The directory where performance JSON results will be stored. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing performance results, mapping filename to metrics. |
|
|
""" |
|
|
logger.info(f"Starting ROCm performance evaluation for kernels in: {exec_folder}") |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
perf_results = self.perf_evaluator(exec_folder,gpu_id=gpu_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Performance evaluation failed: {e}") |
|
|
return {} |
|
|
logger.success("ROCm performance evaluation completed successfully.") |
|
|
return perf_results |
|
|
except Exception as e: |
|
|
logger.error(f"ROCm performance evaluation failed: {e}") |
|
|
return {} |
|
|
|