| import os |
| import unittest |
|
|
| import torch |
| import torch.multiprocessing as mp |
| from accelerate.utils import set_seed |
|
|
| from specforge.distributed import init_distributed |
| from specforge.modeling.target.eagle3_target_model import ( |
| CustomEagle3TargetModel, |
| HFEagle3TargetModel, |
| SGLangEagle3TargetModel, |
| ) |
| from tests.utils import get_available_port |
|
|
|
|
| @torch.no_grad() |
| def test_target_model_backend(rank, world_size, port, tp_size): |
| os.environ["RANK"] = str(rank) |
| os.environ["LOCAL_RANK"] = str(rank) |
| os.environ["WORLD_SIZE"] = str(world_size) |
| os.environ["MASTER_ADDR"] = "localhost" |
| os.environ["MASTER_PORT"] = str(port) |
|
|
| init_distributed(tp_size=tp_size) |
| set_seed(42) |
|
|
| input_ids = torch.randint(0, 1000, (2, 256)).cuda() |
| attention_mask = torch.ones_like(input_ids) |
| loss_mask = torch.ones_like(input_ids) |
|
|
| hf_target_model = HFEagle3TargetModel.from_pretrained( |
| "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda" |
| ) |
| hf_target_model.set_aux_hidden_states_layers() |
| hf_out = hf_target_model.generate_eagle3_data( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| loss_mask=loss_mask, |
| ) |
| del hf_target_model |
|
|
| custom_target_model = CustomEagle3TargetModel.from_pretrained( |
| "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda" |
| ) |
| custom_target_model.set_aux_hidden_states_layers() |
| custom_out = custom_target_model.generate_eagle3_data( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| loss_mask=loss_mask, |
| ) |
| del custom_target_model |
|
|
| |
| assert torch.allclose( |
| hf_out.target, custom_out.target, atol=1e-5, rtol=1e-5 |
| ), f"Logits are not close: \nhf: {hf_out[0] - custom_out[0]}" |
| assert torch.allclose( |
| hf_out.loss_mask, custom_out.loss_mask, atol=1e-5, rtol=1e-5 |
| ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}" |
| assert torch.allclose( |
| hf_out.input_ids, custom_out.input_ids, atol=1e-5, rtol=1e-5 |
| ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}" |
| assert torch.allclose( |
| hf_out.hidden_states, custom_out.hidden_states, atol=1e-5, rtol=1e-5 |
| ), f"Logits are not close: \ndiff: {hf_out[1] - custom_out[1]}" |
|
|
| sgl_target_model = SGLangEagle3TargetModel.from_pretrained( |
| "unsloth/Llama-3.2-1B", torch_dtype=torch.float16, device="cuda" |
| ) |
| sgl_target_model.set_aux_hidden_states_layers() |
| sgl_out = sgl_target_model.generate_eagle3_data( |
| input_ids=input_ids, attention_mask=attention_mask, loss_mask=loss_mask |
| ) |
| del sgl_target_model |
|
|
| assert torch.equal(hf_out.loss_mask, sgl_out.loss_mask) |
| assert torch.equal(hf_out.input_ids, sgl_out.input_ids) |
| assert torch.allclose( |
| hf_out.hidden_states, sgl_out.hidden_states, atol=1e-1, rtol=1e-2 |
| ), f"Hidden states are not close, diff: \n{(hf_out.hidden_states - sgl_out.hidden_states).abs().max()}" |
| assert torch.allclose( |
| hf_out.target, sgl_out.target.half(), atol=1e-1, rtol=1e-2 |
| ), f"Target are not close, diff: \n{(hf_out.target - sgl_out.target).abs().max()}" |
|
|
|
|
| class TestTargetModelBackend(unittest.TestCase): |
|
|
| def test_target_model_backend_dp(self): |
| world_size = 2 |
| port = get_available_port() |
| mp.spawn( |
| test_target_model_backend, nprocs=world_size, args=(world_size, port, 1) |
| ) |
|
|
| def test_target_model_backend_tp(self): |
| world_size = 2 |
| port = get_available_port() |
| mp.spawn( |
| test_target_model_backend, nprocs=world_size, args=(world_size, port, 2) |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| suite = unittest.TestSuite() |
| suite.addTest(unittest.makeSuite(TestTargetModelBackend)) |
| runner = unittest.TextTestRunner(verbosity=2) |
| runner.run(suite) |
|
|