NeMo_Canary / tests /deploy /nemo_deploy.py
Respair's picture
Upload folder using huggingface_hub
b386992 verified
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import shutil
import time
from pathlib import Path
import torch
from nemo.deploy.nlp.megatronllm_deployable import MegatronLLMDeployable
run_export_tests = True
try:
from nemo.deploy import DeployPyTriton
from nemo.deploy.nlp import NemoQueryLLM, NemoQueryLLMPyTorch
from nemo.export.tensorrt_llm import TensorRTLLM
except Exception as e:
run_export_tests = False
def get_accuracy_with_lambada(model, nq, task_ids, lora_uids, test_data_path=None):
# lambada dataset based accuracy test, which includes more than 5000 sentences.
# Use generated last token with original text's last token for accuracy comparison.
# If the generated last token start with the original token, trtllm_correct make an increment.
# It generates a CSV file for text comparison detail.
if test_data_path is None:
raise Exception("test_data_path cannot be None.")
trtllm_correct = 0
trtllm_deployed_correct = 0
trtllm_correct_relaxed = 0
trtllm_deployed_correct_relaxed = 0
all_expected_outputs = []
all_trtllm_outputs = []
with open(test_data_path, 'r') as file:
records = json.load(file)
eval_start = time.perf_counter()
for record in records:
prompt = record["text_before_last_word"]
expected_output = record["last_word"].strip().lower()
trtllm_output = model.forward(
input_texts=[prompt],
max_output_len=1,
top_k=1,
top_p=0,
temperature=0.1,
task_ids=task_ids,
lora_uids=lora_uids,
)
trtllm_output = trtllm_output[0][0].strip().lower()
all_expected_outputs.append(expected_output)
all_trtllm_outputs.append(trtllm_output)
if expected_output == trtllm_output:
trtllm_correct += 1
if (
expected_output == trtllm_output
or trtllm_output.startswith(expected_output)
or expected_output.startswith(trtllm_output)
):
if len(trtllm_output) == 1 and len(expected_output) > 1:
continue
trtllm_correct_relaxed += 1
if nq is not None:
trtllm_deployed_output = nq.query_llm(
prompts=[prompt],
max_output_len=1,
top_k=1,
top_p=0,
temperature=0.1,
task_id=task_ids,
)
trtllm_deployed_output = trtllm_deployed_output[0][0].strip().lower()
if expected_output == trtllm_deployed_output:
trtllm_deployed_correct += 1
if (
expected_output == trtllm_deployed_output
or trtllm_deployed_output.startswith(expected_output)
or expected_output.startswith(trtllm_deployed_output)
):
if len(trtllm_deployed_output) == 1 and len(expected_output) > 1:
continue
trtllm_deployed_correct_relaxed += 1
eval_end = time.perf_counter()
trtllm_accuracy = trtllm_correct / len(all_expected_outputs)
trtllm_accuracy_relaxed = trtllm_correct_relaxed / len(all_expected_outputs)
trtllm_deployed_accuracy = trtllm_deployed_correct / len(all_expected_outputs)
trtllm_deployed_accuracy_relaxed = trtllm_deployed_correct_relaxed / len(all_expected_outputs)
evaluation_time = eval_end - eval_start
return (
trtllm_accuracy,
trtllm_accuracy_relaxed,
trtllm_deployed_accuracy,
trtllm_deployed_accuracy_relaxed,
evaluation_time,
)
def run_in_framework_inference(
model_name,
prompt,
checkpoint_path,
n_gpu=1,
max_batch_size=None,
max_input_len=None,
max_output_len=None,
):
model = MegatronLLMDeployable(checkpoint_path, n_gpu)
nm = DeployPyTriton(
model=model,
triton_model_name=model_name,
http_port=8000,
)
nm.deploy()
nm.run()
nq = NemoQueryLLMPyTorch(url="localhost:8000", model_name=model_name)
output_deployed = nq.query_llm(
prompts=prompt,
)
print("Output: ", output_deployed)
nm.stop()
return None, None, None, None, None
def run_trt_llm_inference(
model_name,
model_type,
prompt,
checkpoint_path,
trt_llm_model_dir,
n_gpu=1,
max_batch_size=8,
use_embedding_sharing=False,
max_input_len=128,
max_output_len=128,
max_num_tokens=None,
ptuning=False,
p_tuning_checkpoint=None,
lora=False,
lora_checkpoint=None,
tp_size=None,
pp_size=None,
top_k=1,
top_p=0.0,
temperature=1.0,
run_accuracy=False,
debug=True,
streaming=False,
stop_words_list=None,
test_deployment=False,
test_data_path=None,
save_engine=False,
):
if Path(checkpoint_path).exists():
if n_gpu > torch.cuda.device_count():
print(
"Path: {0} and model: {1} with {2} gpus won't be tested since available # of gpus = {3}".format(
checkpoint_path, model_name, n_gpu, torch.cuda.device_count()
)
)
return None, None, None, None, None
Path(trt_llm_model_dir).mkdir(parents=True, exist_ok=True)
if debug:
print("")
print("")
print(
"################################################## NEW TEST ##################################################"
)
print("")
print("Path: {0} and model: {1} with {2} gpus will be tested".format(checkpoint_path, model_name, n_gpu))
prompt_embeddings_checkpoint_path = None
task_ids = None
max_prompt_embedding_table_size = 0
if ptuning:
if Path(p_tuning_checkpoint).exists():
prompt_embeddings_checkpoint_path = p_tuning_checkpoint
max_prompt_embedding_table_size = 8192
task_ids = ["0"]
if debug:
print("---- PTuning enabled.")
else:
print("---- PTuning could not be enabled and skipping the test.")
return None, None, None, None, None
lora_ckpt_list = None
lora_uids = None
use_lora_plugin = None
lora_target_modules = None
if lora:
if Path(lora_checkpoint).exists():
lora_ckpt_list = [lora_checkpoint]
lora_uids = ["0", "-1", "0"]
use_lora_plugin = "bfloat16"
lora_target_modules = ["attn_qkv"]
if debug:
print("---- LoRA enabled.")
else:
print("---- LoRA could not be enabled and skipping the test.")
return None, None, None, None, None
trt_llm_exporter = TensorRTLLM(trt_llm_model_dir, lora_ckpt_list, load_model=False)
trt_llm_exporter.export(
nemo_checkpoint_path=checkpoint_path,
model_type=model_type,
tensor_parallelism_size=tp_size,
pipeline_parallelism_size=pp_size,
max_input_len=max_input_len,
max_output_len=max_output_len,
max_batch_size=max_batch_size,
max_prompt_embedding_table_size=max_prompt_embedding_table_size,
use_lora_plugin=use_lora_plugin,
lora_target_modules=lora_target_modules,
max_num_tokens=max_num_tokens,
opt_num_tokens=60,
use_embedding_sharing=use_embedding_sharing,
)
if ptuning:
trt_llm_exporter.add_prompt_table(
task_name="0",
prompt_embeddings_checkpoint_path=prompt_embeddings_checkpoint_path,
)
output = trt_llm_exporter.forward(
input_texts=prompt,
max_output_len=max_output_len,
top_k=top_k,
top_p=top_p,
temperature=temperature,
task_ids=task_ids,
lora_uids=lora_uids,
streaming=streaming,
stop_words_list=stop_words_list,
)
if not use_lora_plugin and not ptuning:
test_cpp_runtime(
engine_path=trt_llm_model_dir,
prompt=prompt,
max_output_len=max_output_len,
debug=True,
)
nq = None
nm = None
output_deployed = ""
if test_deployment:
nm = DeployPyTriton(
model=trt_llm_exporter,
triton_model_name=model_name,
http_port=8000,
)
nm.deploy()
nm.run()
nq = NemoQueryLLM(url="localhost:8000", model_name=model_name)
output_deployed = nq.query_llm(
prompts=prompt,
max_output_len=max_output_len,
top_k=1,
top_p=0.0,
temperature=1.0,
lora_uids=lora_uids,
)
if debug:
print("")
print("--- Prompt: ", prompt)
print("")
print("--- Output: ", output)
print("")
print("")
print("--- Output deployed: ", output_deployed)
print("")
if run_accuracy:
print("Start model accuracy testing ...")
result = get_accuracy_with_lambada(trt_llm_exporter, nq, task_ids, lora_uids, test_data_path)
if test_deployment:
nm.stop()
if not save_engine:
shutil.rmtree(trt_llm_model_dir)
return result
if test_deployment:
nm.stop()
if not save_engine:
shutil.rmtree(trt_llm_model_dir)
return None, None, None, None, None
else:
raise Exception("Checkpoint {0} could not be found.".format(checkpoint_path))
def test_cpp_runtime(
engine_path,
prompt,
max_output_len,
debug,
):
trt_llm_exporter = TensorRTLLM(engine_path, load_model=True)
output = trt_llm_exporter.forward(
input_texts=prompt,
max_output_len=max_output_len,
top_k=1,
top_p=0.0,
temperature=1.0,
)
if debug:
print("")
print("--- Output deployed with cpp runtime: ", output)
print("")
def get_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=f"Deploy nemo models to Triton and benchmark the models",
)
parser.add_argument(
"--model_name",
type=str,
required=True,
)
parser.add_argument(
"--model_type",
type=str,
required=False,
)
parser.add_argument(
"--min_gpus",
type=int,
default=1,
)
parser.add_argument(
"--max_gpus",
type=int,
)
parser.add_argument(
"--checkpoint_dir",
type=str,
default="/tmp/nemo_checkpoint/",
required=False,
)
parser.add_argument(
"--trt_llm_model_dir",
type=str,
)
parser.add_argument(
"--max_batch_size",
type=int,
default=8,
)
parser.add_argument(
"--max_input_len",
type=int,
default=256,
)
parser.add_argument(
"--max_output_len",
type=int,
default=128,
)
parser.add_argument(
"--max_num_tokens",
type=int,
)
parser.add_argument(
"--p_tuning_checkpoint",
type=str,
)
parser.add_argument(
"--ptuning",
default=False,
action='store_true',
)
parser.add_argument(
"--lora_checkpoint",
type=str,
)
parser.add_argument(
"--lora",
default=False,
action='store_true',
)
parser.add_argument(
"--tp_size",
type=int,
default=1,
)
parser.add_argument(
"--pp_size",
type=int,
default=1,
)
parser.add_argument(
"--top_k",
type=int,
default=1,
)
parser.add_argument(
"--top_p",
type=float,
default=0.0,
)
parser.add_argument(
"--temperature",
type=float,
default=1.0,
)
parser.add_argument(
"--run_accuracy",
type=str,
default="False",
)
parser.add_argument("--streaming", default=False, action="store_true")
parser.add_argument(
"--test_deployment",
type=str,
default="False",
)
parser.add_argument(
"--debug",
default=False,
action='store_true',
)
parser.add_argument(
"--ci_upload_test_results_to_cloud",
default=False,
action='store_true',
)
parser.add_argument(
"--test_data_path",
type=str,
default=None,
)
parser.add_argument(
"-b",
'--backend',
nargs='?',
const=None,
default='TensorRT-LLM',
choices=['TensorRT-LLM', 'vLLM', 'In-Framework'],
help="Different options to deploy nemo model.",
)
parser.add_argument(
"--save_engine",
type=str,
default="False",
)
return parser.parse_args()
def run_inference_tests(args):
if args.test_deployment == "True":
args.test_deployment = True
else:
args.test_deployment = False
if args.save_engine == "True":
args.save_engine = True
else:
args.save_engine = False
if args.run_accuracy == "True":
args.run_accuracy = True
else:
args.run_accuracy = False
if args.run_accuracy:
if args.test_data_path is None:
raise Exception("test_data_path param cannot be None.")
result_dic = {}
prompt_template = ["The capital of France is", "Largest animal in the sea is"]
n_gpus = args.min_gpus
if args.max_gpus is None:
args.max_gpus = args.min_gpus
while n_gpus <= args.max_gpus:
if args.backend.lower() == "tensorrt-llm":
result_dic[n_gpus] = run_trt_llm_inference(
model_name=args.model_name,
model_type=args.model_type,
prompt=prompt_template,
checkpoint_path=args.checkpoint_dir,
trt_llm_model_dir=args.trt_llm_model_dir,
n_gpu=n_gpus,
max_batch_size=args.max_batch_size,
max_input_len=args.max_input_len,
max_output_len=args.max_output_len,
max_num_tokens=args.max_num_tokens,
ptuning=args.ptuning,
p_tuning_checkpoint=args.p_tuning_checkpoint,
lora=args.lora,
lora_checkpoint=args.lora_checkpoint,
tp_size=args.tp_size,
pp_size=args.pp_size,
top_k=args.top_k,
top_p=args.top_p,
temperature=args.temperature,
run_accuracy=args.run_accuracy,
debug=args.debug,
streaming=args.streaming,
test_deployment=args.test_deployment,
test_data_path=args.test_data_path,
save_engine=args.save_engine,
)
else:
result_dic[n_gpus] = run_in_framework_inference(
model_name=args.model_name,
prompt=prompt_template,
checkpoint_path=args.checkpoint_dir,
n_gpu=n_gpus,
max_batch_size=args.max_batch_size,
max_input_len=args.max_input_len,
max_output_len=args.max_output_len,
)
n_gpus = n_gpus * 2
test_result = "PASS"
print_separator = False
print("============= Test Summary ============")
for i, results in result_dic.items():
if not results[0] is None and not results[1] is None:
if print_separator:
print("---------------------------------------")
print(
"Number of GPUS: {}\n"
"Model Accuracy: {:.4f}\n"
"Relaxed Model Accuracy: {:.4f}\n"
"Deployed Model Accuracy: {:.4f}\n"
"Deployed Relaxed Model Accuracy: {:.4f}\n"
"Evaluation Time [s]: {:.2f}".format(i, *results)
)
print_separator = True
if results[1] < 0.5:
test_result = "FAIL"
print("=======================================")
print("TEST: " + test_result)
if test_result == "FAIL":
raise Exception("Model accuracy is below 0.5")
if __name__ == '__main__':
args = get_args()
run_inference_tests(args)